-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Review Version] Refactor Protocol State with Pebble-based Storage #6197
[Review Version] Refactor Protocol State with Pebble-based Storage #6197
Conversation
@@ -383,7 +383,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder | |||
builder.Component("follower core", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { | |||
// create a finalizer that will handle updating the protocol | |||
// state when the follower detects newly finalized blocks | |||
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer) | |||
final := finalizer.NewFinalizerPebble(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to use a flag for switching between pebble and badger. I had to make a copy of the finalizer, and use a different name for the pebble version of the finalizer.
Same for the collection finalizer
cmd/scaffold.go
Outdated
WithKeepL0InMemory(true). | ||
WithLogger(log). | ||
|
||
// the ValueLogFileSize option specifies how big the value of a | ||
// key-value pair is allowed to be saved into badger. | ||
// exceeding this limit, will fail with an error like this: | ||
// could not store data: Value with size <xxxx> exceeded 1073741824 limit | ||
// Maximum value size is 10G, needed by execution node | ||
// TODO: finding a better max value for each node type | ||
WithValueLogFileSize(128 << 23). | ||
WithValueLogMaxEntries(100000) // Default is 1000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have these find-tuned config for badger, however, I didn't do any for pebble, just used default options in this PR.
The default pebble options works well for execution node in Mainnet Test Execution Node. But might be good to benchmark and find some config to twisk.
storage/pebble/operation/common.go
Outdated
type ReaderBatchWriter struct { | ||
db *pebble.DB | ||
batch *pebble.Batch | ||
callbacks []func() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
badger supports transactions. However, pebble doesn't.
pebble only supports atomic batch updates. This is the key difference.
With badger transaction, we have a lot operations that create a badger transaction, then reads some data, and update some data depending on the result of the reads, and if the transaction is committed successfully, then we also use callback to update cache.
In order to implement a similar behavior as badger transaction in pebble, I create this ReaderBatchWriter struct. It contains the batch for updates. If we need to read something within the transaction, then the db
is be used to read data.
When reading the data, there are chances where we need to take into consideration the writes that has not been committed, this is why I made the IndexedBatch
method to return the Batch object, which holds all the write set.
And it also contains the callback that allows us to update cache after successfully committing the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
I think this is a highly unsafe construction for this refactoring, because it hides conceptually breaking changes for the business logic at a very low level. Further explanation:
- Previously, reads and writes within the same transaction all operated atomically on a snapshot of the data base. For example, I could read the latest finalized bock within a transaction and decide to write something depending on what I found for the latest finalized bock.
- The
ReaderBatchWriter
you are proposing here breaks this. I can read the latest finalized block and decide what I want to write in response, but by the time the write hits the data base, the latest finalized block might have already changed. While theReaderBatchWriter
exposes similar functionality as a badger transaction (reading and writing) it doesn't provide atomicity guarantees of reads at all. - what we need to do is to go through every piece of business logic that previously used a badger transaction, confirm that we can asynchronously do the reads upfront and then change the code.
- I think it would be much safer to not provide
ReaderBatchWriter
and force ourselves to re-write every single occurrence where we previously used a badger transaction. I think that it would be much much more likely for us to find edge cases when we force ourselves to look at each occurrence of badger transaction with reads and refactor it. It is important to be aware that human brains are just not very good at exhaustively finding all occurrences ofReaderBatchWriter
changes the guarantees for componentA
, which in turn changes guarantees for componentB
andC
.
From my perspective, we are doing a low-level change like this, so that the compiler is happy with the higher-level logic, yet guarantees for the higher-level logic have changed and we relying on humans to identify those areas where something could go wrong. I think an approach like this has disproportional risks to overlook bugs that we cannot afford in a project like Flow.
... maybe I change my mind, when I have reviewed more of your PR.
var upsert = insert | ||
var update = insert |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not able to support update
with pebble.
The update operation in badger will first check if the key exists, and only update if the key does not exist.
However, if we implement the similar logic in pebble, it won't be concurrent-safe anyway, because a read can be dirty without transaction.
For instance, if one operation is to update a key with value 10, and another operation is to update the same key with value 20. And if these operations happen concurrently, then both of them might see the key does not exist, and continue to set the value, and in the end, both operation would succeed, and the value will end up being either 10 or 20 depending on which lose the race.
That's why I decided to not implement update
and just use insert
everywhere, the insert will simply set the value regardless the key exists or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with removing upsert
and update
at the low level. 👍
Before merging, I think we should remove these aliases and remove any usages of the functions (I only found one, in TestUpsertEntry
).
storage/pebble/operation/common.go
Outdated
|
||
it, err := r.NewIter(&pebble.IterOptions{ | ||
LowerBound: prefix, | ||
UpperBound: append(prefix, ffBytes...), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The badger implementation maintains a global max
key, so that when iterating from a key range with a start prefix and end prefix, it uses the max key to create the boundary for including the biggest key with the end prefix. :
} else {
// for forward iteration, add the 0xff-bytes suffix to the end
// prefix, to ensure we include all keys with that prefix before
// finishing.
length := uint32(len(end))
diff := max - length
for i := uint32(0); i < diff; i++ {
end = append(end, 0xff)
}
}
I found a way to simplify it by getting rid of this max value completely.
I noticed when creating the key range, we are usually just iterating identifiers, such as iterating the child block IDs, or iterating the event IDs of a block, or iterating transaction results of a block. They are either flow.Identifier or uint32, which takes 32 bytes. So I could just make a ffBytes appended with 32 FF bytes, and use it as the UpperBound for the key range for the inclusion of any keys with the end
prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☠️ see my comment blow for details. Unfortunately, I think that change is unacceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with the worries about this change. The 0xff
suffix approach to prefix-wise iteration depends on tracking database-wide key lengths to be correct. The Badger solution was a little hacky, but at least provided a fairly strong guarantee of correctness by forcing all writes through logic that tracked the max key length. Adding a constant number of bytes is less likely to be robust and correct over time because it does not account for changes to key lengths.
We have encountered very subtle bugs caused by not handling this correctly in the past. (The link on line 356 is supposed to link to this explanation of such a bug).
Change Suggestions
Pebble has an example for prefix-wise iteration in their documentation -- maybe we can use that approach here instead?
Alternatively, we could explicitly overshoot the endpoint of the iteration range, and exit early if we seek to a key with a prefix that doesn't match the supplied prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the IterOptions
doc
// UpperBound specifies the largest key (exclusive) that the iterator will
// return during iteration. ...
I read this to mean UpperBound
needs to be the first non-matching value. effectively prefix+1
. it's probably easier and more accurate to omit upper bound and check if the iterator's current key starts with prefix
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the ideas, I made the changes in this PR in order to highlight the changes and the updated test cases.
it's probably easier and more accurate to omit upper bound and check if the iterator's current key starts with prefix.
This could work for traverse
, but won't work for iterate
which iterates keys between a start prefix and a end prefix. We would have to implement two solutions, whereas we could just use one solution for both traverse
and iterate
.
if err != nil { | ||
return fmt.Errorf("failed to insert chunk locator: %w", err) | ||
} | ||
|
||
// read the latest index | ||
var latest uint64 | ||
err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(tx) | ||
err = operation.RetrieveJobLatestIndex(JobQueueChunksQueue, &latest)(r) | ||
if err != nil { | ||
return fmt.Errorf("failed to retrieve job index for chunk locator queue: %w", err) | ||
} | ||
|
||
// insert to the next index | ||
next := latest + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to prevent dirty reads of the latest
, we have to introduce a mutex lock (storing
) here, so that it's concurrent-safe.
result, err = p.results.byID(meta.ResultID)(batch) | ||
if errors.Is(err, storage.ErrNotFound) { | ||
// if the result is not in the previous blocks, check storage | ||
result, err = p.results.ByID(meta.ResultID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When storing payloads, we require the receipt must refer to a result that exists. Given multiple receipts could refer to the same result, and we want the same result only be stored once, this makes this query a bit tricky: when verifying if the result has been stored, not long we need to look at the write-set in the current batch from previous writes, but also the database.
Badger does this two lookup automatically in a transaction, but pebble requires us to do these two queries separately.
05693a5
to
2dcc6f8
Compare
d322145
to
3905715
Compare
state/protocol/badger/state.go
Outdated
@@ -284,6 +283,7 @@ func (state *State) bootstrapSealingSegment(segment *flow.SealingSegment, head * | |||
if !ok { | |||
return fmt.Errorf("missing latest seal for sealing segment block (id=%s)", blockID) | |||
} | |||
fmt.Println("height =====", height, latestSealID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove
if err == nil { | ||
// QC for blockID already exists | ||
return storage.ErrAlreadyExists | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhangchiqing for putting this together! We had a call about this change. Here is the read.ai link for the call. As you review this PR, I request that you also think about the following:
This will help us determine how we want to roll this change out. ❤️ |
|
||
err := operation.InsertExecutedBlock(rootSeal.BlockID)(txn) | ||
err := operation.InsertExecutedBlock(rootSeal.BlockID)(w) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The badger version of Insert will first check if the key exists, and error if not exist.
However, the pebble version of Insert will just overwrite the key without checking whether the key exists.
This means, we can't detect or prevent block from being executed twice at the database level. We just have to rely on the application level to prevent that. In the worst case, executing the same block twice will not cause serious problem to the protocol as long as it happens rarely. So I think it's OK to skip the check.
@@ -479,7 +479,7 @@ func (s *state) UpdateHighestExecutedBlockIfHigher(ctx context.Context, header * | |||
defer span.End() | |||
} | |||
|
|||
return operation.RetryOnConflict(s.db.Update, procedure.UpdateHighestExecutedBlockIfHigher(header)) | |||
return operation.WithReaderBatchWriter(s.db, procedure.UpdateHighestExecutedBlockIfHigher(header)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This UpdateHighestExecutedBlockIfHigher
is no longer concurrent-safe, if two blocks at different height are executed concurrently, a dirty write might write with a wrong highest executed height.
But I think this is OK for the tradeoff of simplicity, because the highest height are only used by places where doesn't require accuracy, such as metrics or getting epoch counter, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
I disagree. While your argument might be true now, we want to write maintainable code - code that is correct in the future. I am not sure you will still remember this subtle edge case in 12 months from now -- I certainly won't.
So imagine there is an engineer who sees "highest executed block". Clearly, if that is correctly implemented this number can only ever go up. You argue that it is fine right now, but you are breaking the meaning of very clearly defined concepts in the code base. Adding subtle foot-guns here and there with the argument that it doesn't break anything now is exactly the strategy that turns a code base into a unmaintainable mess, where every little change breaks something at a very different part of the code base.
The complexity of our protocol is already high enough. For a high-assurance software we cannot afford leaving subtle edge cases in the code (where the algorithms behave factually incorrect) just with the argument that it doesn't break anything now.
In my opinion, this (and all similar occurrences) absolutely need to be fixed!
} | ||
// retrieve the current finalized cluster state boundary | ||
var boundary uint64 | ||
err = operation.RetrieveClusterFinalizedHeight(header.ChainID, &boundary)(f.db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This read used to be in the transaction. However, with pebble, we have to read it before the batch write. So in theory there is chance for a dirty read if the MakeFinal is called concurrently. But since we know hotstuff finalizes block in a single thread, MakeFinal won't be called concurrently, which means the finalized height read from here won't become outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
While I agree with your argument, I am strongly of the opinion that needs to be reflected in the code base. Our goal is to not leave subtle foot guns in the code base.
The collection finalizer is a standalone component. It doesn't even say anything about not being concurrency safe -- but in fact it is not (anymore). If the finalizer is only to be executed by a specific component, that requirement needs to be reflected in the code base, so it is hard (or at least very obvious to engineers) when they are breaking this requirement. At the moment, I would classify this as a subtle foot-gun, hence as unacceptable for our high-assurance software.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think addressing this (Leo's comment; my concern) could be its own PR. Added item to issue #6337 (comment)
} | ||
|
||
func (w *badgerWriterBatch) DeleteRange(start, end []byte) error { | ||
return fmt.Errorf("not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to unify the API between badger.Transaction and pebble.Batch.
pebble.Batch has DeleteRange
, which badger.Transaction doesn't support.
storage/badger/commits.go
Outdated
@@ -70,6 +70,10 @@ func (c *Commits) BatchStore(blockID flow.Identifier, commit flow.StateCommitmen | |||
return operation.BatchIndexStateCommitment(blockID, commit)(writeBatch) | |||
} | |||
|
|||
func (c *Commits) BatchStore2(blockID flow.Identifier, commit flow.StateCommitment, tx storage.BatchWriter) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be removed
@@ -7,9 +7,6 @@ import ( | |||
// Guarantees represents persistent storage for collection guarantees. | |||
type Guarantees interface { | |||
|
|||
// Store inserts the collection guarantee. | |||
Store(guarantee *flow.CollectionGuarantee) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A guarantee is never and should not stored alone. Usually it's stored along with the block. Removing the method from the interface level, but keep the method in the module implementation.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And later, once they finalize it, they add the CollectionGuarantee to their data base and send that guarantee to the Consensus nodes.
A collection guarantee is compound model derived from a cluster block, basically a cluster block QC and block payload. Collection Nodes store this data as HotStuff models (blocks, payloads, QCs), from which they can derive a collection guarantee at any time.
In the same way that Consensus Nodes don't need to separately store a "finalized block" in their database when they finalize a block, Collection Nodes don't need to separately store a collection guarantee once it is guaranteed.
In the context of the Consensus Follower, Leo is right that we do not want to store parts of a block independently; we store either the whole block and all its contents or nothing. So this change makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my point is that collection nodes currently don't store guarantee, but builds them on the fly (see collection finalizer) and submit it to consensus nodes (see collection pusher). And consensus nodes also don't store the guarantee when receiving them, but save them in the mempool ( see consensus ingestion core).
The consensus nodes and other non-consensus nodes only store the guarantee when storing the block payload, which is implemented by Block.Store. That's why I removed the Store method on the Guarantees interface, for the same reason, I also removed Payloads.Store
. This guarantees that if the database ever has a guarantee stored, then the block containing this guarantee must also exist in the database, since block and guarantee are stored atomically, and this is the only way to store guarantee.
indexOwnReceiptOps := transaction.WithTx(func(tx *badger.Txn) error { | ||
err := operation.IndexOwnExecutionReceipt(blockID, receiptID)(tx) | ||
// check if we are storing same receipt | ||
if errors.Is(err, storage.ErrAlreadyExists) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't prevent dirty reads without transaction, have to skip this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
Are you sure that is a wise idea? You are the expert, but if I remember correctly we previously had execution nodes crash telling us that they were attempting to override their own result with a different one. With this check removed, they wouldn't notice that anymore.
I think we have options here:
- Wouldn't including a mutex suffice to prevent dirty reads? Then we can check first, because no other components would be able to write (ideally, we would move all the write operations related to ExecutionReceipts into this package not not export them.
func convertNotFoundError(err error) error { | ||
if errors.Is(err, pebble.ErrNotFound) { | ||
return storage.ErrNotFound | ||
} | ||
return err | ||
} | ||
|
||
// O(N) performance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to implement a general reverse iteration with pebble. Instead, I just iterate all keys and find the highest. For now, this function is only used by looking for the highest version beacon , which usually have very few.
storage/pebble/procedure/children.go
Outdated
} | ||
|
||
return nil | ||
// TODO: use transaction to avoid race condition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are relying on the consensus algorithm to ensure the method is called in a single thread, which it does right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding your comment I think using a transaction provides inadequate guarantees for concurrent access. Please see my comment.
In my opinion, the only way to make this code safe is to delegate the sole authority for writing those indices to one single component. Consequently, this code here should be part of that higher-level component and not exported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
We are relying on the consensus algorithm to ensure the method is called in a single thread, which it does right now.
I am not sure that is correct ... at least it is not apparent, so we have notable surface to accidentally break this in the future:
- The compliance engine calls
State.Extend
flow-go/engine/consensus/compliance/core.go
Line 372 in 51969ec
err = c.state.Extend(ctx, block) - The consensus builder is calling
State.Extend
flow-go/module/builder/consensus/builder.go
Line 147 in 51969ec
err = b.state.Extend(ctx, proposal)
Compliance engine and Block builder have different worker threads I think. Both call Extend
, which in turn calls Mutator.insert
, which calls IndexNewBlock
. So we already have concurrent access in my opinion!
@@ -14,6 +14,9 @@ type QuorumCertificates interface { | |||
// StoreTx stores a Quorum Certificate as part of database transaction QC is indexed by QC.BlockID. | |||
// * storage.ErrAlreadyExists if any QC for blockID is already stored | |||
StoreTx(qc *flow.QuorumCertificate) func(*transaction.Tx) error | |||
|
|||
// * storage.ErrAlreadyExists if any QC for blockID is already stored | |||
StorePebble(qc *flow.QuorumCertificate) func(PebbleReaderBatchWriter) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to unify the Store function for both badger and pebble. Because badger needs to support transaction and pebble needs to support batch update. And the API for transaction and batch update are different.
So I ended up create a new method StorePebble
. badger will not implement StorePebble
, and pebble will not implement StoreTx
.
599e713
to
e87084a
Compare
Just passed benchnet testing with 20tps benchmarking. |
|
||
type Transaction interface { | ||
// TODO: rename to writer | ||
type BatchWriter interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the differentiation between BatchWriter
and Transaction
from Badger doesn't make sense to carry over to Pebble.
Besides IndexedBatch
s, which it seems like we can mostly avoid using, all write operations to Pebble are equivalent to the WriteBatch
API from Badger. With Pebble, there is no distinction between batch-inserting a resource, and "normal"-inserting a resource.
I think we should:
- remove low-level storage operations that use the
Batch*
name scheme (example)- if applicable (no non-batch method already exists), modify these functions to accept
pebble.Writer
instead ofstorage.BatchWriter
- if applicable (no non-batch method already exists), modify these functions to accept
- remove the
BatchWriter
type
func findHighestAtOrBelow( | ||
prefix []byte, | ||
height uint64, | ||
entity interface{}, | ||
) func(*badger.Txn) error { | ||
return func(tx *badger.Txn) error { | ||
) func(pebble.Reader) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only used for finding the latest version beacon.
store heights in 1's compliment? then you can do a forward search.
Yeah that would work. We could also just index the latest version beacon (latest_beacon->beacon_id
) when we index height->beacon_id
. Same thing we do for latest finalized block.
} | ||
|
||
secretsDB, err := bstorage.InitSecret(opts) | ||
secretsDB, err := bstorage.InitSecret(fnb.BaseConfig.secretsdir, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the secrets database contains private Staking and Random Beacon keys
Agree we should keep secrets DB encrypted. Just noting only Random Beacon keys are stored there (not Staking keys)
DeleteRange(start, end []byte) error | ||
} | ||
|
||
type Reader interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this is unused. Can we use pebble.Reader
instead of this interface?
} | ||
|
||
type Reader interface { | ||
Get(key []byte) ([]byte, error) | ||
} | ||
|
||
// BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra | ||
// callbacks which fire after the batch is successfully flushed. | ||
type BatchStorage interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I feel like we can significantly trim down the number of these high-level generic storage interfaces. Ideally, I think we should have only:
Reader
- read-only access to the committed state of the database (possibly we can usepebble.Reader
directly in all cases)BatchWriter
- apebble.WriteBatch
wrapped with ourOnSucceed
callback tracking- This can replace both Badger concepts of
*Transaction
and*WriteBatch
- Ideally we would not provide read access at all here, IMO. Though practically, to make the migration feasible, we need to provide read access. The isolation level of the reads will different, which is a major source of potential bugs. In addition to auditing our previous use of read+write badger transactions, I think we should name and document this
Read
API to make it very clear that it is reading global state.
- This can replace both Badger concepts of
I am guessing that some of these interfaces are necessary to make the migration process manageable, even if they won't be desirable once we complete the transition. My suggestion for this stage is:
- Agree on the high-level storage interfaces we want to use, once the migration is complete. Define these in the code and make use of them as much as possible during the migration.
- Mark "legacy" interfaces that are needed during the migration process as deprecated and add TODOs to remove them
- Remove any left-over interfaces
"github.com/onflow/flow-go/storage" | ||
) | ||
|
||
// batchWriter wraps the storage.BatchWriter to make it compatible with pebble.Writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this is a temporary crutch to get us through the migration, but would not be needed after the migration is complete. In particular because, with Pebble, there is no need to distinguish between "transactions" and "write batches".
If I'm understanding this correctly, could we mark these types and functions as deprecated and document why?
return val, nil | ||
} | ||
|
||
func (b *Batch) GetReader() storage.Reader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is never called - can we remove it?
return func(tx *badger.Txn) error { | ||
err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx) | ||
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.PebbleReaderBatchWriter) error { | ||
return func(tx storage.PebbleReaderBatchWriter) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return func(tx storage.PebbleReaderBatchWriter) error { | |
return storage.OnlyWriter(operation.IndexResultApproval(resultID, chunkIndex, approvalID)) |
ErrAlreadyExists
is no longer returned here, so we could just do this. To replicate the previous sanity check in full, we'd need a lock here.
// NOTE: SN nodes need to explicitly set --insecure-secrets-db to true in order to | ||
// disable secrets database encryption | ||
if fnb.NodeRole == flow.RoleConsensus.String() && fnb.InsecureSecretsDB { | ||
fnb.Logger.Warn().Msg("starting with secrets database encryption disabled") | ||
} else { | ||
encryptionKey, err := loadSecretsEncryptionKey(fnb.BootstrapDir, fnb.NodeID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Unfortunately Pebble doesn't support encryption from what I could tell. We would need to implement it on top of Pebble. (Or maybe just keep using Badger for only the secrets DB -- it's pretty tiny.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of High Level Feedback & Suggestions
Mutually Exclusive Procedures (Isolation)
The difference in isolation levels between Badger and Pebble is the main source of potential bugs during this migration. In addition to finding and re-implementing instances where we rely on Badger's isolation level, I think we should separate and document low-level storage operations that need to be mutex-protected at the higher level:
- For individual storage operations (eg. read/write one key) that are accessed in a mutex-protected higher-level component:
- Document the mutex requirement
- If possible, make them package-private (or
internal
to thestorage/pebble
subtree
- Consolidate mutex-protected compound storage operations (multiple reads and writes, which together must be mutex-protected).
- I think re-purposing the
procedure
package, so it (only) contains all these compound, mutex-protected storage methods would make sense
- I think re-purposing the
High-Level Storage API
Some of the high-level storage constructs we have don't make sense with a Pebble-backed storage implementation. In particular:
- we do not need a distinction between "write batch" and "transaction"
- we should avoid compound storage operations that intermingle reads and writes
Suggestions
- Remove usage of
IndexedBatch
(see [Review Version] Refactor Protocol State with Pebble-based Storage #6197 (comment) and [Review Version] Refactor Protocol State with Pebble-based Storage #6197 (comment)) - Simplify high-level storage APIs (see [Review Version] Refactor Protocol State with Pebble-based Storage #6197 (comment) and [Review Version] Refactor Protocol State with Pebble-based Storage #6197 (comment))
- One "writer" interface backed by
WriteBatch
- One global "reader" interface backed by
pebble.DB
- If we need to keep additional interfaces to simplify the migration process, mark them as deprecated.
- One "writer" interface backed by
Database Encryption
Pebble does not support encryption. I don't think we should ship a version that reverts encryption of the secrets database by default. We can:
- Implement an encryption layer for storage methods operating on the secrets DB (my preference)
- Continue using Badger only the secrets DB
Iterative Badger-Based Migration
I'm in favour of @AlexHentschel's suggestion to complete the re-implementation of storage methods that depend on Badger's isolation level iteratively, retaining Badger as the storage backend until our storage implementation is fully safe with a read-committed isolation level.
if errors.Is(err, storage.ErrAlreadyExists) { | ||
// some evidence about execution fork already stored; | ||
// we only keep the first evidence => noting more to do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (t *Transaction) Set(key, value []byte) error { | ||
return t.writer.Set(key, value, pebble.Sync) | ||
} | ||
|
||
func (t *Transaction) Delete(key []byte) error { | ||
return t.writer.Delete(key, pebble.Sync) | ||
} | ||
|
||
func (t *Transaction) DeleteRange(start, end []byte) error { | ||
return t.writer.DeleteRange(start, end, pebble.Sync) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the argument pebble.Sync
that you are passing in: I noticed that none of Pebble's implementations of Batch.Set
, or Batch.Delete
or Batch.DeleteRange
actually use this last value (its just ignored).
func (b *Batch) GetWriter() storage.BatchWriter { | ||
return &Transaction{b.writer} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, it would make sense if our Batch
object here would directly implement the BatchWriter
interface.
type reader struct { | ||
db *pebble.DB | ||
} | ||
|
||
func (r *reader) Get(key []byte) ([]byte, error) { | ||
val, closer, err := r.db.Get(key) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer closer.Close() | ||
return val, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the spirit of defensive programming, I think it would help if Batch
provided no abilities to read the state. If the only functionality you have is to read directly from the data base while Batch
is an independent object where you put deferred write operations into, I think it would be relatively clear that reads are not reflecting the latest values as of the pending writes.
Otherwise, I think it is very dangerous to confuse, because Pebble also provides an indexed batch (see pebble's NewIndexedBatch
), which we are not using here. And even with an indexed Batch, reads could be stale unless the value is overwritten within the batch. I think the surface for possible problems and foot guns is very high with the current implementation offering also reads, while the extra convenience is probably quite little. The tradeoff is not worth it in my oppionion
type reader struct { | |
db *pebble.DB | |
} | |
func (r *reader) Get(key []byte) ([]byte, error) { | |
val, closer, err := r.db.Get(key) | |
if err != nil { | |
return nil, err | |
} | |
defer closer.Close() | |
return val, nil | |
} |
See also Jordan's comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the Finalizer
for the main consensus:
- It can be concurrently called and it determines the ordered list of blocks that still require finalization. If it runs on a stale state, it will just have extra blocks in this list which are already finalized.
- The core logic for writing to the data base, which must be protected from accidental stale writes, is delegated to the lower level Protocol State:
err = f.state.Finalize(ctx, pendingID)
I think using a similar construction here would be beneficial. In addition, this would make it much easier to consolidate the code for the collectors and consensus finalization, which is algorithmically largely the same - it just deviates what we index upon finalization and the actions we take after successful finalization (mempool cleanup and publishing collection). The consensus implementation has already abstrations for this custom logic, so I think it would be relatively easy to consolidate both implementations.
err = operation.WithReaderBatchWriter(m.db, func(rw storage.PebbleReaderBatchWriter) error { | ||
_, tx := rw.ReaderWriter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
We need to protect these lines from accidental concurrent access:
flow-go/state/protocol/pebble/mutator.go
Lines 713 to 720 in 51969ec
err = operation.UpdateFinalizedHeight(header.Height)(tx) | |
if err != nil { | |
return fmt.Errorf("could not update finalized height: %w", err) | |
} | |
err = operation.UpdateSealedHeight(sealed.Height)(tx) | |
if err != nil { | |
return fmt.Errorf("could not update sealed height: %w", err) | |
} |
The previous badger-based code would just fail at
IndexBlockHeight
right before, because badger checked that there was no value stored for the respective height. Now we will just overwrite the value and possibly decrement the latest finalized block height with an outdated value (thereby breaking the protocol state).
state/protocol/pebble/mutator.go
Outdated
if err != nil { | ||
return fmt.Errorf("could not index candidate seal: %w", err) | ||
} | ||
|
||
// index the child block for recovery | ||
err = transaction.WithTx(procedure.IndexNewBlock(blockID, candidate.Header.ParentID))(tx) | ||
err = procedure.IndexNewBlock(blockID, candidate.Header.ParentID)(tx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
The method Extend
is called concurrently by the compliance engine (for incoming blocks from other replicas) and the node's internal hotstuff (persisting its own proposal). Extend
calls insert
here. The function IndexNewBlock
is not sufficiently concurrency safe, as it modifies the list of all children of a block, which could very well cause conflicting edits, i.e. loss of information and the corruption of the protocol state.
return transaction.WithTx(operation.InsertQuorumCertificate(qc)) | ||
func NewQuorumCertificates(collector module.CacheMetrics, db *pebble.DB, cacheSize uint) *QuorumCertificates { | ||
store := func(_ flow.Identifier, qc *flow.QuorumCertificate) func(storage.PebbleReaderBatchWriter) error { | ||
return storage.OnlyWriter(operation.InsertQuorumCertificate(qc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️
Previously, badger guaranteed that we would only ever write one QC for a block into the database. In other words, a QC for a block that was stored in the database would never change.
This now changes:
If operation.InsertQuorumCertificate
is ever called outside of the QuorumCertificates
storage abstraction layer, we are risking to update (overwrite) the QC for a block. The reason is that you included a mutex here, which only protects dirty writes mediated by QuorumCertificates
, but the lower-level operation.InsertQuorumCertificate
remains easily accessible for any logic.
I think the way we are using QCs, overwriting the QC would be fine. We only require a valid QC, but my gut feeling is that it doesn't need to be consistently the same. Nevertheless, this requires careful confirmation and proper documentation.
@@ -30,7 +31,7 @@ type IndexerCore struct { | |||
collections storage.Collections | |||
transactions storage.Transactions | |||
results storage.LightTransactionResults | |||
batcher bstorage.BatchBuilder | |||
batcher *pebble.DB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while we are at it, could we rename this to database
. Given the following line, I have a hard time making sense of the naming here:
batch := bstorage.NewBatch(c.batcher) |
…pproval [Pebble Refactor] Making indexing approval concurrent-safe
[Pebble Refactor] Making finalization concurrent safe
…ndexer [Pebble Storage] Refactor indexing new blocks
Co-authored-by: Peter Argue <[email protected]>
…ipts [Pebble Storage] Make indexing own receipts concurrent safe
[Pebble storage] Refactor key iteration
Close for now. We will start with the database operation abstraction, and then replace badger transactions operations with batch updates so that database operations could be database-implementation agnostic, and lastly replacing badger with pebble. |
Close #6137
This PR refactor the protocol state from badger-based storage to pebble-based storage.
Since a lot of code are copied, in order to make it easy to see the actual code-diff, I added a commit that copies all the badger operations to the pebble folder, and created a base branch (
leo/v0.33-pebble-base
). And by comparing against this base branch, we can see the actual code-diff clearly.That's why I called this PR "Review Version".
How to keep this branch update to date
If there is updates on
v0.33
branch, we should first rebaseleo/v0.33-pebble-base
withv0.33
, and then rebaseleo/v0.33-pebble-storage-to-review
withleo/v0.33-pebble-base